823e9cf358df7d6131b8db4ddd6cb9ade4e7baf5,src/org/jgroups/protocols/UNICAST.java,UNICAST,handleDataReceived,#Address#number#number#boolean#Message#,487

Before Change


                return;
        }

        if(!added && !win.hasMessagesToRemove()) { // no ack if we didn't add the msg (e.g. duplicate)
            return;
        }

        final AtomicBoolean processing=win.getProcessing();
        if(!processing.compareAndSet(false, true)) {
            return;
        }

        // Try to remove (from the AckReceiverWindow) as many messages as possible as pass them up
        boolean released_processing=false;
        int num_regular_msgs_removed=0;

        // Prevents concurrent passing up of messages by different threads (http://jira.jboss.com/jira/browse/JGRP-198);
        // this is all the more important once we have a threadless stack (http://jira.jboss.com/jira/browse/JGRP-181),
        // where lots of threads can come up to this point concurrently, but only 1 is allowed to pass at a time
        // We *can* deliver messages from *different* senders concurrently, e.g. reception of P1, Q1, P2, Q2 can result in
        // delivery of P1, Q1, Q2, P2: FIFO (implemented by UNICAST) says messages need to be delivered only in the
        // order in which they were sent by their senders
        try {
            while(true) {
                Message m=win.remove(processing);
                if(m == null) {
                    released_processing=true;
                    return;
                }

                // discard OOB msg as it has already been delivered (http://jira.jboss.com/jira/browse/JGRP-377)
                if(m.isFlagSet(Message.OOB)) {
                    continue;
                }
                num_regular_msgs_removed++;
                sendAckForMessage(m);
                up_prot.up(new Event(Event.MSG, m));
            }
        }
        finally {

After Change


            }
        }

        if(send_request_for_first_seqno != null) {
            sendRequestForFirstSeqno(send_request_for_first_seqno);
            return;
        }

        byte result=win.add2(seqno, msg); // win is guaranteed to be non-null if we get here
        boolean added=result > 0;
        num_msgs_received++;
        num_bytes_received+=msg.getLength();

        if(added && !msg.isFlagSet(Message.OOB))
            undelivered_msgs.incrementAndGet();

        // Cannot be replaced with if(!added), see https://jira.jboss.org/jira/browse/JGRP-1043 comment 15/Sep/09 06:57 AM

        // We *have* to do send the ACK, to cover the following scenario:
        // - A sends #3 to B
        // - B removes #3 and sends ACK(3) to A. B's next_to_remove is now 4
        // - B's ACK(3) to A is dropped by the network
        // - A keeps retransmitting #3 to B, until it gets an ACK(3)
        // -B will never ACK #3 if the 2 lines below are commented ==> endless retransmission of A's #3 !
        if(result == -1) { // only ack if seqno was < next_to_remove !
            sendAck(msg.getSrc(), seqno);
        }

        // message is passed up if OOB. Later, when remove() is called, we discard it. This affects ordering !
        // http://jira.jboss.com/jira/browse/JGRP-377
        if(msg.isFlagSet(Message.OOB)) {
            if(added)
                up_prot.up(new Event(Event.MSG, msg));
            Message oob_msg=win.removeOOBMessage();
            if(!(undelivered_msgs.get() > 0 && win.hasMessagesToRemove())) {
                if(oob_msg != null)
                    sendAckForMessage(oob_msg);
                return;
            }
        }

        final AtomicBoolean processing=win.getProcessing();
        if(!processing.compareAndSet(false, true)) {
            return;
        }

        // Try to remove (from the AckReceiverWindow) as many messages as possible as pass them up
        boolean released_processing=false;
        int num_regular_msgs_removed=0;

        // Prevents concurrent passing up of messages by different threads (http://jira.jboss.com/jira/browse/JGRP-198);
        // this is all the more important once we have a threadless stack (http://jira.jboss.com/jira/browse/JGRP-181),
        // where lots of threads can come up to this point concurrently, but only 1 is allowed to pass at a time
        // We *can* deliver messages from *different* senders concurrently, e.g. reception of P1, Q1, P2, Q2 can result in
        // delivery of P1, Q1, Q2, P2: FIFO (implemented by UNICAST) says messages need to be delivered only in the
        // order in which they were sent by their senders
        try {
            while(true) {
                List<Message> msgs=win.removeMany(processing);
                if(msgs.isEmpty()) {
                    released_processing=true;
                    return;
                }
                Message highest_removed=msgs.get(msgs.size() -1);
                sendAckForMessage(highest_removed); // guaranteed not to throw an exception !
                for(Message m: msgs) {
                    // discard OOB msg: it has already been delivered (http://jira.jboss.com/jira/browse/JGRP-377)
                    if(m.isFlagSet(Message.OOB))
                        continue;
                    num_regular_msgs_removed++;
                    try {
                        up_prot.up(new Event(Event.MSG, m));
                    }
                    catch(Throwable t) {
                        log.error("couldn't deliver message " + m, t);